博客中代码地址:https://github.com/farliu/farpc.git
本文实现的是服务的发现,也就是图片中的第2,3步,经过上一章的服务注册,对于服务发现我们只需要从zookeeper中取得对应的provider就行了。
项目结构介绍
本节涉及博客中代码的两个module,farpc-registry(服务治理)、farpc-cluster(集群管理)。
在上一章的基础上,我们扩展IRegistrar接口,增加discover方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public interface IRegistrar { void init(String registerAddress); void register(String providerAddress, String service); String discover(String service); }
farpc-cluster同样提供类似的接口ILoadbalance。
1 2 3 4 public interface ILoadbalance { String select(List<String> providers); }
服务发现 基于上一章的代码,zookeeper的连接已经在init方法中初始化了。那么本节要实现的服务发现就是,从zookeeper取得某一个服务下的所有节点,也就是provider。一起来看看代码怎么写。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private static final String SEPARATOR = "/" ;private static final String FOLDER = "/faregistrys" ;private Map<String , List<String >> serviceProviderMap = new HashMap<String , List<String >>();public String discover (String service) { String path = FOLDER + SEPARATOR + service; try { List<String > provider = curatorFramework.getChildren().forPath(path); serviceProviderMap.put (service, provider); watchProvider(path); return serviceProviderMap.get (service).get (0 ); } catch (Exception e) { logger.error(String .format("call ZookeeperRegistrarImpl.discover, occur exception, service:[%s], e.getMessage:[%s]" , service, e.getMessage()), e); return "" ; } private void watchProvider (final String path) { PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, path, true ); PathChildrenCacheListener listener = new PathChildrenCacheListener() { public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { serviceProviderMap.put (path, curatorFramework.getChildren().forPath(path)); } }; childrenCache.getListenable().addListener(listener); try { childrenCache.start(); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } } }
服务发现的代码也很简单,就是从zookeeper中根据服务取到provider,这里没有做负载均衡,所以始终返回第一个。继续解析一下要注意的地方。
第一点,返回的是一个list,是因为某一个节点路径下肯定是存在多个节点的,也就意味着某一个服务应该有多个provider。
第二点,使用一个map保存,是为了监听zookeeper中provider的变化,也就是watchProvider()方法,给指定的路径添加监听器,当有更新时,更新map中的信息。而这里就是zookeeper提供的监听机制。达到服务动态发现的效果。
同样的我们测试一下代码,记得要先注册服务
1 2 3 4 5 6 7 8 9 10 11 public class ConsumerTest { @Test public void test() throws IOException { IRegistrar registrar = new ZookeeperRegistrarImpl(); registrar.init("127.0.0.1:2181" ); System.out .println(registrar.discover("com.ofcoder.farpc.demo.api.IWelcome" )); } } ------------------------------------------ 127.0 .0 .1 :20880
正确输出zookeeper中管理服务提供者地址。那么基于zookeeper的服务发现也就实现了。
负载均衡 在文中一直有提到,从注册中心拿到的是一个list的provider,那么我们需要做一个类似负载均衡的东西,也就是从众多的provider中,取其中一个来真正调用。这里提供两种方式。
1 2 3 4 5 6 7 8 9 public class RandomLoadbalanceImpl implements ILoadbalance { public String select (List<String > providers) { int len = providers.size (); Random random = new Random(); int lucky = random .nextInt(len); return providers.get (lucky); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class RoundLoadBalanceImpl implements ILoadbalance { private AtomicInteger previous = new AtomicInteger(0 ); public String select (List<String > providers) { int size = providers.size (); if (previous.get () >= size ) { previous.set (0 ); } String provider = providers.get (previous.get ()); previous.set (previous.get () + 1 ); return provider; } }
那么使用的话,我新建了一个抽象类实现IRegistrar,来完成负载均衡.
1 2 3 4 5 6 7 8 9 10 public abstract class AbstractRegistrar implements IRegistrar { public String discover(String service) { List <String > providers = lookup(service); ILoadbalance loadbalance = new RoundLoadBalanceImpl(); String select = loadbalance.select(providers); return select; } public abstract List <String > lookup(String service); }
将之前ZookeeperRegistrarImpl实现AbstractRegistrar,在lookup中完成之前实现的服务发现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class ZookeeperRegistrarImpl extends AbstractRegistrar { private static final Logger logger = LoggerFactory.getLogger(ZookeeperRegistrarImpl.class); private static final int SESSION_TIMEOUT_MS = 5000 ; private static final int SLEEP_TIME_MS = 1000 ; private static final int MAX_RETRIES = 2 ; private static final String SEPARATOR = "/" ; private static final String FOLDER = "/faregistrys" ; private Map<String , List<String >> serviceProviderMap = new HashMap<String , List<String >>(); private CuratorFramework curatorFramework; @Override public List<String > lookup (String service) { String path = FOLDER + SEPARATOR + service; try { List<String > provider = curatorFramework.getChildren().forPath(path); serviceProviderMap.put (service, provider); watchProvider(path); return serviceProviderMap.get (service); } catch (Exception e) { logger.error(String .format("call ZookeeperRegistrarImpl.discover, occur exception, service:[%s], e.getMessage:[%s]" , service, e.getMessage()), e); return null; } } private void watchProvider (final String path) { PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, path, true ); PathChildrenCacheListener listener = new PathChildrenCacheListener() { public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { serviceProviderMap.put (path, curatorFramework.getChildren().forPath(path)); } }; childrenCache.getListenable().addListener(listener); try { childrenCache.start(); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } } }
以上则为服务发现的所有代码,意在模拟dubbo,而不是照抄dubbo,希望可以帮助大家对dubbo服务治理有一定的了解。
dubbo源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 ZookeeperRegistry.doSubscribe() public void doSubscribe(final URL url, final NotifyListener listener) { try { if (ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get (url); if (listeners == null ) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); listeners = zkListeners.get (url); } ChildListener zkListener = listeners.get (listener); if (zkListener == null ) { listeners.putIfAbsent(listener, (parentPath, currentChilds) -> { for (String child : currentChilds) { child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child, Constants.CHECK_KEY, String .valueOf(false )), listener); } } }); zkListener = listeners.get (listener); } .... } ZookeeperRegistry.lookup() public List <URL> lookup(URL url) { if (url == null ) { throw new IllegalArgumentException("lookup url == null" ); } try { List <String > providers = new ArrayList<>(); for (String path : toCategoriesPath(url)) { List <String > children = zkClient.getChildren(path); if (children != null ) { providers.addAll(children); } } return toUrlsWithoutEmpty(url, providers); } catch (Throwable e) { throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
源码中标注的1,2点,是不是也看到了我们的代码影子,第一点使用zookeeper的监听器,第二点根据url获得一个list的provider。只是dubbo获取的方式不一样而已。